package cn.doitedu.rtdw.data_sync;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class S01_SyncJob_UserInfoTable2Hbase {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建业务库的 ums_member 表 的cdc连接器表
        // 挑选一些关心的维度字段：  id,username,phone,status,create_time,gender,province,city,job
        tEnv.executeSql("CREATE TABLE ums_member_source (    " +
                "      id INT,                                      " +
                "      username STRING,                             " +
                "      phone STRING,                                " +
                "      status int,                                  " +
                "      create_time timestamp(3),                    " +
                "      gender int,                                  " +
                "      birthday date,                               " +
                "      province STRING,                             " +
                "      city STRING,                                 " +
                "      job STRING ,                                 " +
                "      source_type INT ,                            " +
                "     PRIMARY KEY (id) NOT ENFORCED            " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'realtimedw',          " +
                "     'table-name' = 'ums_member'              " +
                ")");


        // 创建hbase连接器sink表
        tEnv.executeSql("CREATE TABLE ums_member_hbasesink( " +
                " username STRING, " +
                " f ROW<id INT,phone STRING, status INT, create_time TIMESTAMP(3), gender INT, birthday DATE, province STRING, city STRING, job STRING, source_type INT>, " +
                " PRIMARY KEY (username) NOT ENFORCED " +
                ") WITH (                             " +
                " 'connector' = 'hbase-2.2',          " +
                " 'table-name' = 'dim_user_info',     " +
                " 'zookeeper.quorum' = 'doitedu:2181' " +
                ")");

        // 将数据，写入hbase表
        tEnv.executeSql("insert into ums_member_hbasesink " +
                "select username,ROW(id,phone,status,create_time,gender,birthday,province,city,job,source_type) from ums_member_source");
    }

}
